#pragma once

#include "muduo/net/TcpConnection.h"
#include "muduo/proto/codec.h"
#include "muduo/proto/dispatcher.h"
#include "../mqcomm/mq_log.hpp"
#include "../mqcomm/mq_helper.hpp"
#include "../mqcomm/mq_msg.pb.h"
#include "../mqcomm/mq_proto.pb.h"
#include "../mqcomm/mq_threadpool.hpp"

#include "mq_consumer.hpp"
#include "mq_host.hpp"
#include "mq_route.hpp"
namespace ns_kkymq
{
    using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;
    using openChannelRequestPtr = std::shared_ptr<openChannelRequest>;
    using closeChannelRequestPtr = std::shared_ptr<closeChannelRequest>;
    using declareExchangeRequestPtr = std::shared_ptr<declareExchangeRequest>;
    using deleteExchangeRequestPtr = std::shared_ptr<deleteExchangeRequest>;
    using declareQueueRequestPtr = std::shared_ptr<declareQueueRequest>;
    using deleteQueueRequestPtr = std::shared_ptr<deleteQueueRequest>;
    using queueBindRequestPtr = std::shared_ptr<queueBindRequest>;
    using queueUnBindRequestPtr = std::shared_ptr<queueUnBindRequest>;
    using basicPublishRequestPtr = std::shared_ptr<basicPublishRequest>;
    using basicAckRequestPtr = std::shared_ptr<basicAckRequest>;
    using basicConsumeRequestPtr = std::shared_ptr<basicConsumeRequest>;
    using basicCancelRequestPtr = std::shared_ptr<basicCancelRequest>;

    class Channel
    {
    public:
        using ptr = std::shared_ptr<Channel>;
        Channel(const std::string &id, const VirtualHost::ptr &host, const ConsumerManager::ptr &cmp,
                const ProtobufCodecPtr &codec, const muduo::net::TcpConnectionPtr &conn, const threadpool::ptr &pool)
            : _cid(id), _conn(conn), _codec(codec), _cmp(cmp), _host(host), _pool(pool)
        {
            lg(Debug, "new Channel: %p", this);
        }

        ~Channel()
        {
            if (_consumer.get() != nullptr)
            {
                _cmp->remove(_consumer->_tag, _consumer->_q_name);
            }
            lg(Debug, "delete Channel: %p", this);
        }
        // 交换机的声明与删除
        void declareExchange(const declareExchangeRequestPtr &req)
        {
            bool ret = _host->declareExchange(req->exchange_name(), req->exchange_type(),
                                              req->durable(), req->auto_delete(), req->args());
            basicResponse(ret, req->rid(), req->cid());
        }
        void deleteExchange(const deleteExchangeRequestPtr &req)
        {
            _host->deleteExchange(req->exchange_name());
            basicResponse(true, req->rid(), req->cid());
        }
        // 队列的声明与删除
        void declareQueue(const declareQueueRequestPtr &req)
        {
            bool ret = _host->declareQueue(req->queue_name(), req->durable(), req->excludsive(),
                                           req->auto_delete(), req->args());
            if (ret == false)
            {
                return basicResponse(false, req->rid(), req->cid());
            }

            // 初始化队列消费者管理句柄
            _cmp->initQueueConsumer(req->queue_name());

            basicResponse(true, req->rid(), req->cid());
        }
        void deleteQueue(const deleteQueueRequestPtr &req)
        {
            // 销毁队列
            _cmp->destroyQueueConsumer(req->queue_name());
            _host->deleteQueue(req->queue_name());
            basicResponse(true, req->rid(), req->cid());
        }
        // 队列的绑定与解除绑定
        void queueBind(const queueBindRequestPtr &req)
        {
            bool ret = _host->bind(req->exchange_name(), req->queue_name(), req->binding_key());
            basicResponse(ret, req->rid(), req->cid());
        }
        void QueueUnBind(const queueUnBindRequestPtr &req)
        {
            _host->unBind(req->exchange_name(), req->queue_name());
            basicResponse(true, req->rid(), req->cid());
        }
        // 消息的发布
        void basicPublish(const basicPublishRequestPtr &req)
        {
            // 1.找到这个交换机
            Exchange::ptr ep = _host->selectExchange(req->exchange_name());
            if (ep.get() == nullptr)
            {
                // 没有找到该交换机
                return basicResponse(false, req->rid(), req->cid());
            }
            // 2.进行交换路由，判断消息可以发布到交换机绑定的哪个队列中
            MsgQueueBindingMap mqbm = _host->exchangeBindings(req->exchange_name());
            BasicProperties *properties = nullptr;
            std::string routing_key;
            if (req->has_properties())
            {
                properties = req->mutable_properties();
                routing_key = properties->routing_key();
            }
            for (auto &binding : mqbm)
            {
                if (Router::route(ep->_type, routing_key, binding.second->_binding_key))
                {
                    // 3.需要将消息添加到队列中（添加消息的管理）
                    _host->basicPublish(binding.first, properties, req->body());
                    // 4.向线程池中添加一个消息消费任务（向指定队列的订阅者去推送消息,线程池完成）
                    auto task = std::bind(&Channel::consume, this, binding.first);
                    _pool->push(task);
                }
            }
            return basicResponse(true, req->rid(), req->cid());
        }
        // 消息的确认
        void basicAck(const basicAckRequestPtr &req)
        {
            _host->basicAck(req->queue_name(), req->message_id());
            return basicResponse(true, req->rid(), req->cid());
        }
        // 订阅队列消息
        void basicConsume(const basicConsumeRequestPtr &req)
        {
            // 1.判断队列是否存在
            bool ret = _host->existsQueue(req->queue_name());
            if (ret == false)
            {
                return basicResponse(false, req->rid(), req->cid());
            }
            // 2.创建队列的消费者
            auto func = std::bind(&Channel::callback, this, std::placeholders::_1,
                                  std::placeholders::_2, std::placeholders::_3);
            // 创建消费者之后，当前的channel角色就是个消费者
            // lg(Debug,"我发送给%s 一个tag:%s",req->queue_name().c_str(),req->consumer_tag().c_str());
            _consumer = _cmp->create(req->consumer_tag(), req->queue_name(), req->auto_ack(), func);
            return basicResponse(true, req->rid(), req->cid());
        }
        // 取消订阅
        void basicCancel(const basicCancelRequestPtr &req)
        {
            _cmp->remove(req->consumer_tag(), req->queue_name());
        }

    private:
        void callback(const std::string &tag, const BasicProperties *bp, const std::string &body)
        {
            // 针对参数组织处推送消息请求，将消息推送给channel对应客户端
            basicConsumeResponse resp;
            resp.set_cid(_cid);
            resp.set_body(body);
            resp.set_consumer_tag(tag);
            if (bp)
            {
                resp.mutable_properties()->set_id(bp->id());
                resp.mutable_properties()->set_delivery_mode(bp->delivery_mode());
                resp.mutable_properties()->set_routing_key(bp->routing_key());
            }
            _codec->send(_conn, resp);
        }
        void consume(const std::string &q_name)
        {
            // 指定队列消费消息
            // 1.从队列中取出一条消息
            MessagePtr mp = _host->basicComsume(q_name);
            if (mp.get() == nullptr)
            {
                lg(Debug, "执行消费任务失败,%s队列没有消息!", q_name.c_str());
                return;
            }
            // 2.从队列订阅者中取出一个订阅者
            Consumer::ptr cp = _cmp->choose(q_name);
            if (cp.get() == nullptr)
            {
                lg(Debug, "执行消费任务失败,%s队列没有消费者!", q_name.c_str());
                return;
            }
            // 3.调用订阅者对应的消息处理函数，实现消息的推送
            lg(Debug,"当前订阅者的队列名:%s, tag:%s",cp->_q_name.c_str(),cp->_tag.c_str());
            cp->_callback(cp->_tag, mp->mutable_payload()->mutable_properties(), mp->payload().body());
            // 4.如果订阅者是自动确认———不需要等待确认，直接删除消息，否则需要等到外部收到确认后再删除
            if (cp->_auto_ack)
            {
                _host->basicAck(q_name, mp->payload().properties().id());
            }
        }

        void basicResponse(bool ok, const std::string &rid, const std::string &cid)
        {
            basicCommonResponse resp;
            resp.set_rid(rid);
            resp.set_cid(cid);
            resp.set_ok(ok);
            _codec->send(_conn, resp);
        }

    private:
        std::string _cid;
        Consumer::ptr _consumer;
        muduo::net::TcpConnectionPtr _conn;
        ProtobufCodecPtr _codec;
        ConsumerManager::ptr _cmp;
        VirtualHost::ptr _host;
        threadpool::ptr _pool;
    };

    class ChannelManager
    {
    public:
        using ptr = std::shared_ptr<ChannelManager>;
        ChannelManager()
        {
        }

        bool openChannel(const std::string &id, const VirtualHost::ptr &host,
                         const ConsumerManager::ptr &cmp, const ProtobufCodecPtr &codec,
                         const muduo::net::TcpConnectionPtr &conn, const threadpool::ptr &pool)
        {
            std::unique_lock<std::mutex> lock(_mtx);
            auto it = _channels.find(id);
            if (it != _channels.end())
            {
                lg(Debug, "信道:%s 已经存在!", id.c_str());
                return false;
            }
            auto channel = std::make_shared<Channel>(id, host, cmp, codec, conn, pool);
            _channels.insert({id, channel});
            return true;
        }

        void closeChannel(const std::string &id)
        {
            std::unique_lock<std::mutex> lock(_mtx);
            _channels.erase(id);
        }

        Channel::ptr getChannel(const std::string &id)
        {
            std::unique_lock<std::mutex> lock(_mtx);
            auto it = _channels.find(id);
            if (it == _channels.end())
            {
                return Channel::ptr();
            }
            return it->second;
        }

    private:
        std::mutex _mtx;
        std::unordered_map<std::string, Channel::ptr> _channels;
    };
}
